Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Add http endpoint to communicate dispatcher readySubscriber status #344

Conversation

lberk
Copy link
Member

@lberk lberk commented Jan 29, 2021

this is a WIP/PoC for one half of a more immediate fix for the
'readySubscriber' issues in the kafkachannel. This will need some of
the work that @devguyio is working on with lifting the network prober
to knative/pkg. However, I'm about as far as I think I can get until
I know more about that implementation.

/hold

@knative-prow-robot knative-prow-robot added do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. labels Jan 29, 2021
@google-cla google-cla bot added the cla: yes Indicates the PR's author has signed the CLA. label Jan 29, 2021
@knative-prow-robot knative-prow-robot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. approved Indicates a PR has been approved by an approver from all required OWNERS files. labels Jan 29, 2021
Copy link
Contributor

@devguyio devguyio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a few comments, great work Lukas!

pkg/channel/consolidated/dispatcher/dispatcher.go Outdated Show resolved Hide resolved
pkg/channel/consolidated/dispatcher/dispatcher.go Outdated Show resolved Hide resolved
pkg/channel/consolidated/dispatcher/dispatcher.go Outdated Show resolved Hide resolved
pkg/channel/consolidated/dispatcher/dispatcher.go Outdated Show resolved Hide resolved
pkg/channel/consolidated/dispatcher/dispatcher.go Outdated Show resolved Hide resolved
}
}
} else { //ensure the pointer is populated or things go boom
d.channelSubscriptions[channelRef] = &KafkaSubscription{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've a feeling this needs to be protected with a lock, wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean like the outer lock d.consumerUpdateLock.Lock() on L236?

@slinkydeveloper
Copy link
Contributor

/assign

return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
consumer.logger.Info(fmt.Sprintf("Starting partition consumer, topic: %s, partition: %d, initialOffset: %d", claim.Topic(), claim.Partition(), claim.InitialOffset()))
consumer.handler.SetReady(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic IMO is not 100% fine, there is no such ready state for a consumer group, there is only the concept of claims: consuming x partitions of y topics. I personally interpret the readyness of KafkaChannel (but that's a personal opinion and is subject to changes) as:

KafkaChannel is ready whenever all his consumer groups are consuming all partitions of the topic

Hence I suppose the dispatcher should expose the topics/partitions map is consuming, and the control plane collect them and, checking with the kafka topic informations from kafka, figure out if all the partitions are being consumed or not. Look at https://github.com/knative-sandbox/eventing-kafka/pull/328/files#diff-318b63710b5768647bfeb5b9bca7a04b80f3a8388439a6b8bc50eddfb8fed8e7R33 for more details.

While the code here assumes that whenever we start the consumer group, it will get all a claim for all the partitions of the subscribed topic. This is eventually true as soon as there is only one consumer in the consumer group, aka only one dispatcher is actually consuming the topic. This might not be the case if you increase the replicas of the dispatcher pod without HA enabled (aka everybody consumes everything).

Also another side note is that you should set this readyness state in the Setup function of the consumer group IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two points:

  • I first tried in my PoC with setting it up in the Setup function, and there's still a gap between events received and the actual consumer dispatching messages. So the bug is ONLY solved when the subscription is added to ready subscribers as a result of the first call of ConsumeClaim .
  • About the ready
    • This is definitely our own concept. It signifies that the "dispatching routine" is ready. Naming is irrelevant. We're marking that this dispatcher, has a go-routine that's ready to dispatch messages from that "claim".
    • IMHO, from a "subscriber" POV , readiness of the subscription means that there's at least one consumer that joined in that consumer group, regardless of partitions assignment. Cause IMHO that's the concern of the Kafka broker, eventually a rebalance will kick in, unassigned partitions will be reassigned, and messages from last commit will be dispatched. So the Kafka controller should only worry if there's at least one "consumer" that is dispatching for that "consumer group". It shouldn't even worry later if that sole consumer dies, as long as the consumer group exists. Cause the sender should keep sending, and the HA of the "consumers" is a Kafka Broker concern, once they're back, and a rebalance kicks in, all messages sent since the last commit will be delivered since the consume group already existed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I first tried in my PoC with setting it up in the Setup function, and there's still a gap between events received and the actual consumer dispatching messages. So the bug is ONLY solved when the subscription is added to ready subscribers as a result of the first call of ConsumeClaim .

Ok, I guess that's a wrong understanding from my side of Sarama

readiness of the subscription means that there's at least one consumer that joined in that consumer group, regardless of partitions assignment

I'm not talking about listening for the partition assignment, i'm talking about flagging ready when all the partitions of a topic are effectively consumed, aka there's a consumer which is consuming it. Otherwise, status ready just means "whenever the consumer group is created", because when you create the consumer group the broker will set up for you the offset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to distinguish between Channel readiness from Subscription readiness.

IMO, a channel is ready when it is ready to receive events, independently of the status of the subscribers. The reason is subscribers come and go. When a subscriber is created or deleted, it does not mean the channel cannot receive events.

Regarding subscription readiness, IMHO, it must provide the following guarantee:

After a subscription is marked ready, all events being sent to the channel are guaranteed to be forwarded to his subscriber. The subscriber MAY receive events before being marked as ready.

So from a technical PoV I agree with @slinkydeveloper:

A subscription is ready whenever his consumer group is consuming all partitions of the topic

A partition is considered being consumed when the initial offset has been resolved, which happens during the first fetch call. And in order to not miss an event, all partitions offset must be resolved.

Note that if the dispatcher crashes in-between the first fetch call and before committing the offset then the guarantee expressed above is broken. If we want to provide a strong guarantee, we should mark the subscription ready when all partition offsets have been committed at least once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, a channel is ready when it is ready to receive events, independently of the status of the subscribers. The reason is subscribers come and go. When a subscriber is created or deleted, it does not mean the channel cannot receive events.

👍

Note that if the dispatcher crashes in-between the first fetch call and before committing the offset then the guarantee expressed above is broken. If we want to provide a strong guarantee, we should mark the subscription ready when all partition offsets have been committed at least once.

I believe we don't need that (you can't commit if you're never ready 😄) but in general 👍 with your readyness definition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, I've updated where we call SetReady to be true after we handle a message from the channel, false if there was an error.

pkg/channel/consolidated/dispatcher/dispatcher.go Outdated Show resolved Hide resolved
@knative-prow-robot knative-prow-robot added the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Feb 2, 2021
@devguyio
Copy link
Contributor

devguyio commented Feb 2, 2021

@lberk Do we also need some changes to the dispatcher yaml to expose the port?

@devguyio
Copy link
Contributor

devguyio commented Feb 2, 2021

/assign

@lberk lberk force-pushed the consolidatedKafkaChannelDispatcherRework branch from 55231cb to 1159051 Compare February 3, 2021 01:31
@lberk lberk force-pushed the consolidatedKafkaChannelDispatcherRework branch from caf4240 to b183732 Compare February 3, 2021 17:12
@knative-prow-robot knative-prow-robot removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Feb 3, 2021
@codecov
Copy link

codecov bot commented Feb 3, 2021

Codecov Report

Merging #344 (018bf28) into master (ffd2397) will increase coverage by 0.08%.
The diff coverage is 79.77%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #344      +/-   ##
==========================================
+ Coverage   75.17%   75.26%   +0.08%     
==========================================
  Files         124      124              
  Lines        4705     4782      +77     
==========================================
+ Hits         3537     3599      +62     
- Misses        948      961      +13     
- Partials      220      222       +2     
Impacted Files Coverage Δ
pkg/source/adapter/adapter.go 66.10% <0.00%> (-1.14%) ⬇️
pkg/source/mtadapter/adapter.go 70.93% <72.00%> (+2.67%) ⬆️
pkg/channel/consolidated/dispatcher/dispatcher.go 55.29% <81.81%> (+6.17%) ⬆️
pkg/common/consumer/consumer_factory.go 78.78% <100.00%> (+3.03%) ⬆️
pkg/common/consumer/consumer_handler.go 83.33% <100.00%> (+2.38%) ⬆️
...rce/reconciler/source/resources/receive_adapter.go 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ffd2397...a6fb63b. Read the comment docs.

@lberk lberk changed the title WIP Add http endpoint to communicate dispatcher readySubscriber status Add http endpoint to communicate dispatcher readySubscriber status Feb 3, 2021
@knative-prow-robot knative-prow-robot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Feb 3, 2021
@lberk
Copy link
Member Author

lberk commented Feb 3, 2021

/unhold
I've added back the observekind so that there should be no functional changes if/when merging this. Once @devguyio posts his controller status PR, we can remove the observekind there.

@knative-prow-robot knative-prow-robot removed the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Feb 3, 2021
@knative-metrics-robot
Copy link

The following is the coverage report on the affected files.
Say /test pull-knative-sandbox-eventing-kafka-go-coverage to re-run this coverage report

File Old Coverage New Coverage Delta
pkg/channel/consolidated/dispatcher/dispatcher.go 62.9% 67.2% 4.3
pkg/common/consumer/consumer_handler.go 89.5% 90.9% 1.4

@slinkydeveloper
Copy link
Contributor

/lgtm
/approve

I think we're good to go, when @devguyio will glue the code together with the control plane bits, we'll figure out the remaining details

@knative-prow-robot knative-prow-robot added the lgtm Indicates that a PR is ready to be merged. label Feb 4, 2021
@knative-prow-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: lberk, slinkydeveloper

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:
  • OWNERS [lberk,slinkydeveloper]

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow-robot knative-prow-robot merged commit 67919f1 into knative-extensions:master Feb 4, 2021
@@ -83,13 +85,15 @@ func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup
if err != nil {
consumer.logger.Infow("Failure while handling a message", zap.String("topic", message.Topic), zap.Int32("partition", message.Partition), zap.Int64("offset", message.Offset), zap.Error(err))
consumer.errors <- err
consumer.handler.SetReady(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with the proposed definition for when a subscriber is ready, this is not needed.

}

if mustMark {
session.MarkMessage(message, "") // Mark kafka message as processed
if ce := consumer.logger.Desugar().Check(zap.DebugLevel, "debugging"); ce != nil {
consumer.logger.Debugw("Message marked", zap.String("topic", message.Topic), zap.Binary("value", message.Value))
}
consumer.handler.SetReady(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same. This should move out of the condition.

devguyio pushed a commit to devguyio/eventing-kafka that referenced this pull request Mar 25, 2021
…native-extensions#344)

* Groundwork for dispatcher subscriber status w/ consumers

* Add functionality to serve http of subscribers

* Drop callback functions

* Fix failing unit test, add unsub check for chanref

* Rework http handler to be dispatcher local (not kafkasubscription)

* Variable typo fix

* Fix copyright years

* Change header name to constant

* Move subscription handler to its own ServeHTTP func

* Remove channelRef in KafkaSubscription

* Change bad channelref request to http.StatusNotFound

* Add namespace to subscriptions http output

* Add Unit tests for servehttp & setready

* Split uriSplit into channelRefName{,space} vars

* Expose dispatcher http-sub-status port in disatcher svc

* Add servehttp diagnostic messages

* One more uriSplit -> channelRefName variable rename

* Change how we write the http response

* Add empty SetReady() method to source RA

* Fix consumer_handler_test

* more linting

* Add back ObserveKind until controller implements substatus scraper

* Add more ServeHTTP unit tests

* slightly alter where we mark a handler as ready or not
matzew pushed a commit to matzew/eventing-kafka that referenced this pull request Mar 25, 2021
…native-extensions#344)

* Groundwork for dispatcher subscriber status w/ consumers

* Add functionality to serve http of subscribers

* Drop callback functions

* Fix failing unit test, add unsub check for chanref

* Rework http handler to be dispatcher local (not kafkasubscription)

* Variable typo fix

* Fix copyright years

* Change header name to constant

* Move subscription handler to its own ServeHTTP func

* Remove channelRef in KafkaSubscription

* Change bad channelref request to http.StatusNotFound

* Add namespace to subscriptions http output

* Add Unit tests for servehttp & setready

* Split uriSplit into channelRefName{,space} vars

* Expose dispatcher http-sub-status port in disatcher svc

* Add servehttp diagnostic messages

* One more uriSplit -> channelRefName variable rename

* Change how we write the http response

* Add empty SetReady() method to source RA

* Fix consumer_handler_test

* more linting

* Add back ObserveKind until controller implements substatus scraper

* Add more ServeHTTP unit tests

* slightly alter where we mark a handler as ready or not
knative-prow-robot pushed a commit that referenced this pull request Mar 25, 2021
* Add http endpoint to communicate dispatcher readySubscriber status (#344)

* Groundwork for dispatcher subscriber status w/ consumers

* Add functionality to serve http of subscribers

* Drop callback functions

* Fix failing unit test, add unsub check for chanref

* Rework http handler to be dispatcher local (not kafkasubscription)

* Variable typo fix

* Fix copyright years

* Change header name to constant

* Move subscription handler to its own ServeHTTP func

* Remove channelRef in KafkaSubscription

* Change bad channelref request to http.StatusNotFound

* Add namespace to subscriptions http output

* Add Unit tests for servehttp & setready

* Split uriSplit into channelRefName{,space} vars

* Expose dispatcher http-sub-status port in disatcher svc

* Add servehttp diagnostic messages

* One more uriSplit -> channelRefName variable rename

* Change how we write the http response

* Add empty SetReady() method to source RA

* Fix consumer_handler_test

* more linting

* Add back ObserveKind until controller implements substatus scraper

* Add more ServeHTTP unit tests

* slightly alter where we mark a handler as ready or not

* Add Subscription prober (#433)

* Add Subscription prober

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix endpoints informer in cons. KafkaChannel controller

* Fix unittests after adding status prober

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Format and order go imports in cons. channel controller

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Rename import alias and remove unused variable

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Add dispatcher prober test for tesitng a single pod

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Support probing dispatchers for multiple partitions kafka channels

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Update deps

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix conumer handler test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* remove unused hashes from status probing test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Apply review comments and add a prober test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Remove old comment

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix fake status manager

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Return error if IsReady returns an error

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Change probing to be partition based and fix some corner cases of channel deletion

* Change cleanup logic to clean ready subscriptions only

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Remove cleanup to avaid consumers race

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Do not test controller name from generated source which can change. (#320)

* Update codegen

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

Co-authored-by: Lukas Berk <lberk@redhat.com>
Co-authored-by: Travis Minke <travis.minke@sap.com>
matzew pushed a commit to matzew/eventing-kafka that referenced this pull request Mar 25, 2021
* Add http endpoint to communicate dispatcher readySubscriber status (knative-extensions#344)

* Groundwork for dispatcher subscriber status w/ consumers

* Add functionality to serve http of subscribers

* Drop callback functions

* Fix failing unit test, add unsub check for chanref

* Rework http handler to be dispatcher local (not kafkasubscription)

* Variable typo fix

* Fix copyright years

* Change header name to constant

* Move subscription handler to its own ServeHTTP func

* Remove channelRef in KafkaSubscription

* Change bad channelref request to http.StatusNotFound

* Add namespace to subscriptions http output

* Add Unit tests for servehttp & setready

* Split uriSplit into channelRefName{,space} vars

* Expose dispatcher http-sub-status port in disatcher svc

* Add servehttp diagnostic messages

* One more uriSplit -> channelRefName variable rename

* Change how we write the http response

* Add empty SetReady() method to source RA

* Fix consumer_handler_test

* more linting

* Add back ObserveKind until controller implements substatus scraper

* Add more ServeHTTP unit tests

* slightly alter where we mark a handler as ready or not

* Add Subscription prober (knative-extensions#433)

* Add Subscription prober

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix endpoints informer in cons. KafkaChannel controller

* Fix unittests after adding status prober

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Format and order go imports in cons. channel controller

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Rename import alias and remove unused variable

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Add dispatcher prober test for tesitng a single pod

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Support probing dispatchers for multiple partitions kafka channels

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Update deps

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix conumer handler test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* remove unused hashes from status probing test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Apply review comments and add a prober test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Remove old comment

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix fake status manager

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Return error if IsReady returns an error

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Change probing to be partition based and fix some corner cases of channel deletion

* Change cleanup logic to clean ready subscriptions only

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Remove cleanup to avaid consumers race

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Do not test controller name from generated source which can change. (knative-extensions#320)

* Update codegen

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

Co-authored-by: Lukas Berk <lberk@redhat.com>
Co-authored-by: Travis Minke <travis.minke@sap.com>
matzew pushed a commit to matzew/eventing-kafka that referenced this pull request Mar 26, 2021
* Add http endpoint to communicate dispatcher readySubscriber status (knative-extensions#344)

* Groundwork for dispatcher subscriber status w/ consumers

* Add functionality to serve http of subscribers

* Drop callback functions

* Fix failing unit test, add unsub check for chanref

* Rework http handler to be dispatcher local (not kafkasubscription)

* Variable typo fix

* Fix copyright years

* Change header name to constant

* Move subscription handler to its own ServeHTTP func

* Remove channelRef in KafkaSubscription

* Change bad channelref request to http.StatusNotFound

* Add namespace to subscriptions http output

* Add Unit tests for servehttp & setready

* Split uriSplit into channelRefName{,space} vars

* Expose dispatcher http-sub-status port in disatcher svc

* Add servehttp diagnostic messages

* One more uriSplit -> channelRefName variable rename

* Change how we write the http response

* Add empty SetReady() method to source RA

* Fix consumer_handler_test

* more linting

* Add back ObserveKind until controller implements substatus scraper

* Add more ServeHTTP unit tests

* slightly alter where we mark a handler as ready or not

* Add Subscription prober (knative-extensions#433)

* Add Subscription prober

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix endpoints informer in cons. KafkaChannel controller

* Fix unittests after adding status prober

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Format and order go imports in cons. channel controller

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Rename import alias and remove unused variable

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Add dispatcher prober test for tesitng a single pod

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Support probing dispatchers for multiple partitions kafka channels

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Update deps

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix conumer handler test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* remove unused hashes from status probing test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Apply review comments and add a prober test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Remove old comment

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix fake status manager

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Return error if IsReady returns an error

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Change probing to be partition based and fix some corner cases of channel deletion

* Change cleanup logic to clean ready subscriptions only

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Remove cleanup to avaid consumers race

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Do not test controller name from generated source which can change. (knative-extensions#320)

* Update codegen

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

Co-authored-by: Lukas Berk <lberk@redhat.com>
Co-authored-by: Travis Minke <travis.minke@sap.com>
openshift-merge-robot referenced this pull request in openshift-knative/eventing-kafka Mar 26, 2021
* Add http endpoint to communicate dispatcher readySubscriber status (#344)

* Groundwork for dispatcher subscriber status w/ consumers

* Add functionality to serve http of subscribers

* Drop callback functions

* Fix failing unit test, add unsub check for chanref

* Rework http handler to be dispatcher local (not kafkasubscription)

* Variable typo fix

* Fix copyright years

* Change header name to constant

* Move subscription handler to its own ServeHTTP func

* Remove channelRef in KafkaSubscription

* Change bad channelref request to http.StatusNotFound

* Add namespace to subscriptions http output

* Add Unit tests for servehttp & setready

* Split uriSplit into channelRefName{,space} vars

* Expose dispatcher http-sub-status port in disatcher svc

* Add servehttp diagnostic messages

* One more uriSplit -> channelRefName variable rename

* Change how we write the http response

* Add empty SetReady() method to source RA

* Fix consumer_handler_test

* more linting

* Add back ObserveKind until controller implements substatus scraper

* Add more ServeHTTP unit tests

* slightly alter where we mark a handler as ready or not

* Add Subscription prober (#433)

* Add Subscription prober

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix endpoints informer in cons. KafkaChannel controller

* Fix unittests after adding status prober

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Format and order go imports in cons. channel controller

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Rename import alias and remove unused variable

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Add dispatcher prober test for tesitng a single pod

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Support probing dispatchers for multiple partitions kafka channels

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Update deps

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix conumer handler test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* remove unused hashes from status probing test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Apply review comments and add a prober test

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Remove old comment

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix fake status manager

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Return error if IsReady returns an error

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Change probing to be partition based and fix some corner cases of channel deletion

* Change cleanup logic to clean ready subscriptions only

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Remove cleanup to avaid consumers race

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Do not test controller name from generated source which can change. (#320)

* Update codegen

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

Co-authored-by: Lukas Berk <lberk@redhat.com>
Co-authored-by: Travis Minke <travis.minke@sap.com>

Co-authored-by: Ahmed Abdalla Abdelrehim <aabdelre@redhat.com>
Co-authored-by: Lukas Berk <lberk@redhat.com>
Co-authored-by: Travis Minke <travis.minke@sap.com>
matzew added a commit to matzew/eventing-kafka that referenced this pull request Sep 24, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cla: yes Indicates the PR's author has signed the CLA. lgtm Indicates that a PR is ready to be merged. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants